Kafka 用クライアントの Kafdrop で Aiven for Apache Kafka に接続する
ウィスキー、シガー、パイプをこよなく愛する大栗です。
クラウド データ プラットフォームの Aiven はデータベースだけでなく、Kafka のような他のデータ処理機能も利用可能です。Kafka 用のクライアントツール である Kafdrop で接続してみます。
Kafdrop?
Kafdrop は Kafka 用の Web UI でブローカーやトピックなどの情報やメッセージの表示を行えます。
やってみる
Aiven for Apache Kafka を起動する
Aiven for Apache Kafka の起動は以下のエントリを参考にしてください。
サービスが起動したら、すぐにアクセスを許可する IP アドレスを絞ります。まず以下のようなサービスでアクセス元のグローバル IP アドレスを確認します。
次に Aiven Console で Kafka のサービスのOverview
タブにあるAllowed IP Addresses
を確認します。おそらくデフォルトの0.0.0.0/0
になっています。何処からでもアクセス可能な状態なので自分の IP アドレスからだけ接続できるように設定しましょう。Change
をクリックします。
先に設定されている0.0.0.0/0
を消して、先程調べた自分の IP アドレスを設定します。設定したらSave Changes
をクリックして確定します。
これで、取り敢えずはましになりました。
Kafka の接続情報を取得する
次に Aiven Console で Kafka のサービスのOverview
タブにあるConnection information
を確認して認証情報をダウンロードします。Access Key はservice.key
、Access Certificate はservice.cert
、CA Certificate はca.pem
というファイル名でローカルマシンに保存します。
ダウンロードしたファイルを openssl コマンドで pkcs12 形式に変換します。
$ openssl pkcs12 -export -inkey service.key -in service.cert -out kafka.keystore.p12 -name demo_kafka_key Enter Export Password: # パスワードを設定 Verifying - Enter Export Password: # パスワードを再度入力
keytool コマンドで CA 証明書をキーストアにロードします。
$ keytool -import -file ca.pem -alias KafkaCA -keystore kafka-client.truststore.jks Enter keystore password: # 6文字以上でパスワードを設定 Re-enter new password: # パスワードを再度入力 Owner: CN=abcd1234-abcd-1234-abcd-12345678abcd Project CA Issuer: CN=abcd1234-abcd-1234-abcd-12345678abcd Project CA Serial number: 1a2b3c4d5e6f7g8h1a2b3c4d5e6f7g8h1a2b3c4d Valid from: Thu Jan 06 04:43:48 UTC 2022 until: Sun Jan 04 04:43:48 UTC 2032 Certificate fingerprints: SHA1: 1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A SHA256: 1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A:1A Signature algorithm name: SHA384withRSA Subject Public Key Algorithm: 3072-bit RSA key Version: 3 Extensions: #1: ObjectId: 2.5.29.19 Criticality=false BasicConstraints:[ CA:true PathLen:0 ] #2: ObjectId: 2.5.29.15 Criticality=false KeyUsage [ Key_CertSign Crl_Sign ] #3: ObjectId: 2.5.29.14 Criticality=false SubjectKeyIdentifier [ KeyIdentifier [ 0000: 1A 1A 1A 1A 1A 1A 1A 1A 1A 1A 1A 1A 1A 1A 1A 1A RLL..N...-.k.0.3 0010: 1A 1A 1A 1A ./g; ] ] Trust this certificate? [no]: y # y を入力 Certificate was added to keystore
Kafka に接続するための設定ファイルkafka.properties
を作成します
security.protocol=SSL ssl.keystore.password=$PASSWD # openssl コマンドで設定したパスワードを入力 ssl.keystore.type=PKCS12 ssl.truststore.password=$TRUST-PASSWD # keytool コマンドで設定したパスワードを入力
これで接続するためのファイルであるkafka.keystore.p12
、kafka-client.truststore.jks``kafka.properties
を作成しました。
Kafdrop で Kafka に接続する
Kafdrop を起動しますが、ここでは Docker を使用します。
$ ::: ::: ::: :::::::::: ::::::::: ::::::::: :::::::: ::::::::: :::::::: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: :+: +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +:+ +#++:++ +#++:++#++: :#::+::# +#+ +:+ +#++:++#: +#+ +:+ +#++:++#+ +#++: +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ +#+ #+# #+# #+# #+# #+# #+# #+# #+# #+# #+# #+# #+# #+# #+# ### ### ### ### ### ######### ### ### ######## ### ######## Writing Kafka properties into kafka.properties Writing Kafka truststore into kafka.truststore.jks Writing Kafka keystore into kafka.keystore.jks 2022-01-12 08:42:26.312 INFO ${sys:PID} [ main] k.Kafdrop$EnvironmentSetupListener : Initializing JAAS config 2022-01-12 08:42:26.325 INFO ${sys:PID} [ main] k.Kafdrop$EnvironmentSetupListener : env: null .isSecured kafka: false 2022-01-12 08:42:26.326 INFO ${sys:PID} [ main] k.Kafdrop$EnvironmentSetupListener : Env: null 2022-01-12 08:42:26.547 INFO 1 [kground-preinit] o.h.v.i.u.Version : HV000001: Hibernate Validator 6.2.0.Final ・ ・ ・
これで Kafdrop が起動しているのでアクセスしてみます。ポート 9000 でコンテナを起動しているので http://localhost:9000
にアクセスします。Kafdrop でトピックを新しく作成してみましょう。一番下にある+ New
をクリックします。
Topic name
にトピック名を設定します。ここではTestTopic1
とします。入力したら+Create
をクリックします。
成功すると一番下に Successfully create topic TestTopic1 と出てきます。TestTopic1がリンクになっているのでクリックします。
このように Kafdrop 経由で Kafka の操作可能です。
さいごに
著者は Kafka を触ったことがなくどう使ったら良いかよく分かっていないのですが、Kafdrop で Kafka の状態を確認するのが容易になりました。これを機会にもう少し Kafka を触っていこうと思います。